[アップデート] Amazon OpenSearch Ingestion が Amazon Kinesis Data Streams からのデータの取り込みをサポートしました

[アップデート] Amazon OpenSearch Ingestion が Amazon Kinesis Data Streams からのデータの取り込みをサポートしました

リテールアプリ共創部の中野です。

Amazon OpenSearch Ingestion が Amazon Kinesis Data Streams からのデータ取り込みをサポートするようになったというアップデートが公開されました。

https://aws.amazon.com/jp/about-aws/whats-new/2024/11/amazon-opensearch-ingestion-amazon-kinesis-data-streams/

OpenSearch Integration は、OpenSearch Service ドメインへデータの抽出、変換、取り込みを一括で簡単に行えるような機能をサポートしています。
以前から S3 や Kinesis Firehose、DynamoDB などのサービスからの取り込みをサポートしていました。

このアップデートによって、Kinesis Data Streams 経由で大量にストリーミングされるデータをリアルタイム分析したいようなケースでの活用が可能になっています。
アプリケーションのログを OpenSearch でリアルタイム分析したいなどの用途で使えそうですね。

やってみた

0. 全体構成

以下のような構成で OpenSearch Service へ Lambda のアプリケーションログを書き込むような状況を作ってみます。
この構成で OpenSearch Dashboard でログを確認できるかみてみましょう。

Lambda から出力されたアプリケーションログを CloudWatch Logs へ書き込みます。
その後、CloudWatch Logs のサブスクリプションフィルターで Kinesis Data Streams へログデータを送信します。
その後、OpenSearch Integration が Kinesis Data Streams のストリーミングデータを消費して、OpenSearch のインデックスへ書き込みを行います。

最終的にユーザーが OpenSearch のデータを視覚的に確認したい場合は、EC2 の踏み台サーバー経由でセッションマネージャーを利用してアクセスすることで確認できます。

なお、OpenSearch Service のドメインを作成するために手動だとかなり時間がかかってしまいます。
そこで、以下の AWS CDK のサンプルを使わせていただいて、事前に環境を用意しました。

https://github.com/aws-samples/opensearch-vpc-cdk?tab=readme-ov-file

1. Lambda の用意

アプリケーションのログを確認できるように Lambda を準備します。
Lambda のランタイムはなんでもよいですが、今回は Node.js を使いました。
また、適当なログを書き込むために以下のようなソースを用意します。

Capture-2024-11-12-172956.png

export const handler = async (event) => {
  console.error("エラーです");
  return "エラーです";
};

2. Kinesis Data Streams の構築

次に、ログをストリーミングする Kinesis Data Streams をつくります。
オンデマンドモードで作成してシャードを自動スケールするようにしておきます。

Capture-2024-11-12-174429.png
Capture-2024-11-12-174512.png

3. CloudWatch Logs Subscription Filter の設定

サブスクリプションフィルターを作成する前に、事前に CloudWatch Logs から Kinesis Data Streams へ PutRecord を許可する IAM ロールを作成します。
基本的には以下の公式ドキュメントを参考にしました。

https://docs.aws.amazon.com/ja_jp/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#DestinationKinesisExample

IAM ロールの信頼ポリシーは以下のようにしました。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "logs.ap-northeast-1.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringLike": {
          "aws:SourceArn": "arn:aws:logs:ap-northeast-1:<account-id>:*"
        }
      }
    }
  ]
}

IAM ポリシーは以下です。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "kinesis:PutRecord",
            "Resource": "arn:aws:kinesis:ap-northeast-1:<account-id>:stream/kinesis-data-streams-to-open-search"
        }
    ]
}

次に、Lambda から書き込まれたログのサブスクリプションフィルターを設定します。
ロググループ単位でのフィルターを作成します。

Capture-2024-11-12-180508.png

アクセス許可を付与する設定で、さきほど作成した IAM ロールをアタッチします。

Capture-2024-11-12-182437.png

ログ形式はその他にしてそのまま作成します。

スクリーンショット_2024-11-18_0_03_02.png

ここまでで、Lambda から生成されたログを Kinesis Data Streams へ書き込みできるようになったので、テスト実行してみます。
Lambda のテスト実行を行って書き込みを行われていることを確認します。

データビューアーのタブから適当なシャードを選択して、開始位置を水平トリムにしてレコードを取得します。
パーティションキー毎にバイナリデータが書き込まれていれば準備完了です。
ここで、バイナリデータになっているのは gzip 形式で書き込みが行われているためです。

Capture-2024-11-13-135333.png

4. OpenSearch ドメインの構築

それでは、OpenSearch ドメインを構築してきます。
OpenSearch 自体はイチから作成すると設定を選んだり構築完了まで待つのに 20〜30 分程度かかってしまうため、前述で触れた AWS CDK のサンプルコードを利用させていただいて構築します。

package.json のモジュールバージョンは若干古かったため以下を利用しました。

package.json
{
  "name": "opensearch-vpc-cdk",
  "version": "0.1.0",
  "bin": {
    "opensearch-vpc-cdk": "bin/opensearch-vpc-cdk.js"
  },
  "scripts": {
    "build": "tsc",
    "watch": "tsc -w",
    "test": "jest",
    "cdk": "cdk"
  },
  "devDependencies": {
    "@types/jest": "^29.5.14",
    "@types/node": "22.9.0",
    "aws-cdk": "2.167.1",
    "jest": "^29.7.0",
    "ts-jest": "^29.2.5",
    "ts-node": "^10.9.2",
    "typescript": "~5.6.3"
  },
  "dependencies": {
    "@aws-cdk/aws-lambda-python-alpha": "^2.167.1-alpha.0",
    "@aws-sdk/client-iam": "^3.693.0",
    "aws-cdk-lib": "2.167.1",
    "constructs": "^10.4.2",
    "source-map-support": "^0.5.21"
  }
}

AWS CDK のコードは以下の通りです。

bin/opensearch-vpc-cdk.ts
#!/usr/bin/env node
import "source-map-support/register";
import * as cdk from "aws-cdk-lib";
import { OpensearchVpcCdkStack } from "../lib/opensearch-vpc-cdk-stack";

const app = new cdk.App();
new OpensearchVpcCdkStack(app, "OpensearchVpcCdkStack", {});

Lambda のインデクシングするコードは検証で不要でしたので削除しました。
OpenSearch のエンジンバージョンも古かったため、現状の最新である v2.15 を指定しました。

lib/opensearch-vpc-cdk-stack.ts
import * as cdk from "aws-cdk-lib";
import { CfnOutput, RemovalPolicy } from "aws-cdk-lib";
import {
  BastionHostLinux,
  BlockDeviceVolume,
  MachineImage,
  Peer,
  Port,
  SecurityGroup,
  Vpc,
} from "aws-cdk-lib/aws-ec2";
import {
  AnyPrincipal,
  CfnServiceLinkedRole,
  PolicyStatement,
} from "aws-cdk-lib/aws-iam";
import { Domain, EngineVersion } from "aws-cdk-lib/aws-opensearchservice";
import { Construct } from "constructs";
import { IAMClient, ListRolesCommand } from "@aws-sdk/client-iam";

const iam = new IAMClient({});

export class OpensearchVpcCdkStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // VPC
    const vpc = new Vpc(this, "Vpc", {});

    // Security Group
    const bastionSecurityGroup = new SecurityGroup(
      this,
      "BastionSecurityGroup",
      {
        vpc: vpc,
        allowAllOutbound: true,
        securityGroupName: "BastionSecurityGroup",
      }
    );

    const opensearchSecurityGroup = new SecurityGroup(
      this,
      "OpensearchSecurityGroup",
      {
        vpc: vpc,
        securityGroupName: "OpensearchSecurityGroup",
      }
    );

    opensearchSecurityGroup.addIngressRule(bastionSecurityGroup, Port.tcp(443));

    // Service-linked role that Amazon OpenSearch Service will use
    (async () => {
      const response = await iam.send(
        new ListRolesCommand({
          PathPrefix: "/aws-service-role/opensearchservice.amazonaws.com/",
        })
      );

      // Only if the role for OpenSearch Service doesn't exist, it will be created.
      if (response.Roles && response.Roles?.length == 0) {
        new CfnServiceLinkedRole(this, "OpensearchServiceLinkedRole", {
          awsServiceName: "es.amazonaws.com",
        });
      }
    })();

    // Bastion host to access Opensearch Dashboards
    new BastionHostLinux(this, "BastionHost", {
      vpc,
      securityGroup: bastionSecurityGroup,
      machineImage: MachineImage.latestAmazonLinux2023(),
      blockDevices: [
        {
          deviceName: "/dev/xvda",
          volume: BlockDeviceVolume.ebs(10, {
            encrypted: true,
          }),
        },
      ],
    });

    // OpenSearch domain
    const domain = new Domain(this, "Domain", {
      version: EngineVersion.OPENSEARCH_2_15,
      nodeToNodeEncryption: true,
      enforceHttps: true,
      encryptionAtRest: {
        enabled: true,
      },
      vpc: vpc,
      capacity: {
        dataNodes: 2,
      },
      removalPolicy: RemovalPolicy.DESTROY,
      zoneAwareness: {
        enabled: true,
      },
      securityGroups: [opensearchSecurityGroup],
    });

    domain.addAccessPolicies(
      new PolicyStatement({
        principals: [new AnyPrincipal()],
        actions: ["es:ESHttp*"],
        resources: [domain.domainArn + "/*"],
      })
    );
  }
}

モジュールをインストールしてデプロイします。

npm ci
npx cdk bootstrap
npx cdk deploy

OpenSearch ドメインが構築されました。

スクリーンショット_2024-11-17_10_38_59.png

セッションマネージャーで自身のローカル環境から OpenSearch Dashboard にアクセスできますので確認します。
事前にセッションマネージャープラグインも必要なので、ローカルにインストールしていない場合はインストールします。

https://docs.aws.amazon.com/systems-manager/latest/userguide/session-manager-working-with-install-plugin.html

--target 部分には踏み台用 EC2 インスタンスのインスタンス ID を入力します。
--parameters の host には OpenSearch の VPC ドメインエンドポイントをいれますが、https は含めずに FQDN のみ入力してください。

aws ssm start-session --target <EC2のインスタンスID> --document-name AWS-StartPortForwardingSessionToRemoteHost --parameters '{"portNumber":["443"],"localPortNumber":["8157"], "host":["*******.ap-northeast-1.es.amazonaws.com"]}'


Starting session with SessionId: **************
Port 8157 opened for sessionId **************
Waiting for connections...

Waiting for connectionsという文字がでていればセッションとしては確立されているため OpenSearch Dashboard にアクセスできます。

ブラウザから、https://localhost:8157/_dashboardsにアクセスして以下のような画面がでれば OK です。

スクリーンショット 2024-11-17 10.52.56.png

5. OpenSearch Integration のパイプライン構築

最後に、Kinesis Data Streams のデータを取り込んで OpenSearch Service へデータを変換してロードするパイプラインを構築します。

事前に IAM ロールを作成します。

IAM ロールの信頼ポリシーは以下です。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "osis-pipelines.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

一方で、IAM ポリシーは以下のように定義します。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "allowReadFromStream",
      "Effect": "Allow",
      "Action": [
        "kinesis:DescribeStream",
        "kinesis:DescribeStreamConsumer",
        "kinesis:DescribeStreamSummary",
        "kinesis:GetRecords",
        "kinesis:GetShardIterator",
        "kinesis:ListShards",
        "kinesis:ListStreams",
        "kinesis:ListStreamConsumers",
        "kinesis:RegisterStreamConsumer",
        "kinesis:SubscribeToShard"
      ],
      "Resource": ["arn:aws:kinesis:ap-northeast-1:<account-id>:stream/*"]
    },
    {
      "Sid": "allowAccessToOS",
      "Effect": "Allow",
      "Action": ["es:DescribeDomain", "es:ESHttp*"],
      "Resource": [
        "arn:aws:es:ap-northeast-1:<account-id>:domain/domain66ac69e0-xwfaqbkvba0m",
        "arn:aws:es:ap-northeast-1:<account-id>:domain/domain66ac69e0-xwfaqbkvba0m/*"
      ]
    }
  ]
}

この IAM ポリシーによって、OpenSearch Integration から Kinesis Data Streams への読み込みアクセスと OpenSearch Integration から OpenSearch ドメインへの読み込み書き込みアクセスが許可されます。

IAM ロールを作成できたら、Integration パイプラインを構築します。

スクリーンショット_2024-11-18_0_46_15.png

スクリーンショット 2024-11-17 10.55.26.png

スクリーンショット 2024-11-17 12.08.15.png

パイプラインの構成で、設定を書いていきます。
注意点として、IAM ロールは、OpenSearch Service の sink 定義と Kinesis Data Streams の source 定義の両方で同じである必要があります。

また、processor でデータの変換定義をしています。
CloudWatch のロググループ名を使用してさまざまなログタイプのデータセットとして識別して OpenSearch のインデックスにドキュメントを書き込みます。
add_entries には、ロググループ名や Kinesis のデータストリーム名などのメタデータを付与できるように指定しています。

version: "2"
kinesis-pipeline:
  source:
    kinesis-data-streams:
      acknowledgments: true
      codec:
        # Kinesisレコードが集約されているかどうかに基づいて、json、newline、またはndjsonコーデックを選択できます。
        # JSONコーデックは、ネストされたCloudWatchイベントを個々のログエントリに解析し、OpenSearchにドキュメントとして書き込みます。
        json:
          key_name: "logEvents"
          # これらのキーには、CloudWatchサブスクリプションフィルターによって送信されたメタデータが含まれています。
          # 個々のログイベントに加えて:
          # https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#DestinationKinesisExample
          # include_keys: [ 'owner', 'logGroup', 'logStream' ]
        newline:
      streams:
        - stream_name: "kinesis-data-streams-to-open-search"
          # ストリームの開始から取り込みを開始する場合はこれを有効にします。
          initial_position: "EARLIEST"
          # checkpoint_interval: "PT5M"
          # CloudWatchの場合、圧縮は常にgzipになりますが、他のソースの場合は異なります:
          compression: "gzip"
      aws:
        # KDSへのアクセスを持つRole ARNを提供します。このロールはosis-pipelines.amazonaws.comとの信頼関係を持つ必要があります。
        sts_role_arn: "arn:aws:iam::<account-id>:role/kds-to-es-integration-pipeline-role"
        # データストリームのリージョンを提供します。
        region: "ap-northeast-1"

  processor:
    - rename_keys:
        entries:
          # CloudWatchのタイムスタンプを観測タイムスタンプとして含めます - ログが生成され、CloudWatchに送信された時間:
          - from_key: "timestamp"
            to_key: "observed_timestamp"
    - date:
        # OSIがログイベントを処理した現在のタイムスタンプを含めます:
        from_time_received: true
        destination: "processed_timestamp"
    - add_entries:
        entries:
          # SS4O共通ログフィールドをサポートします (https://opensearch.org/docs/latest/observing-your-data/ss4o/)
          - key: "cloud/provider"
            value: "aws"
          - key: "cloud/account/id"
            format: "${owner}"
          - key: "cloud/region"
            value: "ap-northeast-1"
          - key: "aws/cloudwatch/log_group"
            format: "${logGroup}"
          - key: "aws/cloudwatch/log_stream"
            format: "${logStream}"
          # data_streamのデフォルト値を含めます:
          - key: "data_stream/namespace"
            value: "default"
          - key: "data_stream/type"
            value: "logs"
          - key: "data_stream/dataset"
            value: "general"
          # このログイベントを含むソースKinesisメッセージに関するメタデータを含めます:
          - key: "aws/kinesis/stream_name"
            value_expression: 'getMetadata("stream_name")'
          - key: "aws/kinesis/partition_key"
            value_expression: 'getMetadata("partition_key")'
          - key: "aws/kinesis/sequence_number"
            value_expression: 'getMetadata("sequence_number")'
          - key: "aws/kinesis/sub_sequence_number"
            value_expression: 'getMetadata("sub_sequence_number")'
    - add_entries:
        entries:
          # ログイベントのコンテキストに基づいてdata_streamフィールドを更新します - この場合、ソース(CloudTrailまたはLambda)によってログイベントを分類します。
          # ビジネスまたはアプリケーションコンテキストによってログを分類するための追加のロジックを追加できます:
          - key: "data_stream/dataset"
            value: "cloudtrail"
            add_when: 'contains(/logGroup, "cloudtrail") or contains(/logGroup, "CloudTrail")'
            overwrite_if_key_exists: true
          - key: "data_stream/dataset"
            value: "lambda"
            add_when: 'contains(/logGroup, "/aws/lambda/")'
            overwrite_if_key_exists: true
          - key: "data_stream/dataset"
            value: "apache"
            add_when: 'contains(/logGroup, "/apache/")'
            overwrite_if_key_exists: true
    # デフォルトのCloudWatchフィールドを削除します。これらはSS4Oフィールドに再マッピングされました:
    - delete_entries:
        with_keys:
          - "logGroup"
          - "logStream"
          - "owner"
    # 非JSONのapacheログを解析するためにGrokパーサーを使用します
    - grok:
        grok_when: '/data_stream/dataset == "apache"'
        match:
          message: ["%{COMMONAPACHELOG_DATATYPED}"]
        target_key: "http"
    # OpenSearchインデックスでのフィールドレベルの検索をサポートするために、ログデータをJSONとして解析しようとします:
    - parse_json:
        # ルートメッセージオブジェクトをaws.cloudtrailに解析して、SS4OログのSS4O標準に一致させます
        source: "message"
        destination: "aws/cloudtrail"
        parse_when: '/data_stream/dataset == "cloudtrail"'
        tags_on_failure: ["json_parse_fail"]
    - parse_json:
        # Lambda関数ログの場合、可能であればルートメッセージオブジェクトをJSONとして解析します - 非JSONのログ関数データを検索可能なフィールドとしてキャプチャするためにGrokサポートを設定することもできます
        source: "message"
        destination: "aws/lambda"
        parse_when: '/data_stream/dataset == "lambda"'
        tags_on_failure: ["json_parse_fail"]
    - parse_json:
        # 一般ログの場合、可能であればルートメッセージオブジェクトをJSONとして解析します
        source: "message"
        destination: "body"
        parse_when: '/data_stream/dataset == "general"'
        tags_on_failure: ["json_parse_fail"]

  sink:
    - opensearch:
        # AWS OpenSearch Serviceドメインエンドポイントを提供します
        hosts: ["https://****************.ap-northeast-1.es.amazonaws.com"]
        # ログデータをログコンテキストに応じて異なるターゲットインデックスにルーティングします:
        index: "ss4o_${data_stream/type}-${data_stream/dataset}-${data_stream/namespace}"
        aws:
          # ドメインへのアクセスを持つRole ARNを提供します。このロールはosis-pipelines.amazonaws.comとの信頼関係を持つ必要があります。
          # このロールは、上記のKinesisで使用したロールと同じでなければなりません。
          sts_role_arn: "arn:aws:iam::<account-id>:role/kds-to-es-integration-pipeline-role"
          # ドメインのリージョンを提供します。
          region: "ap-northeast-1"
          # シンクがAmazon OpenSearch Serverlessコレクションである場合、'serverless'フラグを有効にします
          serverless: false

上記の設定を書き込んで検証ボタンがあるので教えて構文が問題ないか確認します。

スクリーンショット_2024-11-17_11_23_40.png

OpenSearch ドメインが VPC 内にあるリソースのため、VPC のリソースへ接続できるように VPC アクセスを設定します。

スクリーンショット_2024-11-17_11_24_27.png

問題が発生したときにトラブルシューティングしやすいように、パイプライン用の CloudWatch ログ記録を設定しておきます。

スクリーンショット 2024-11-17 11.24.35.png

ステータスがアクティブになれば、パイプラインの構築は完了です。

スクリーンショット 2024-11-17 11.35.20.png

6. OpenSearch Dashboard を見てみた

これで全体構成のすべてのリソースの準備が完了しました。

Lambda のマネージメントコンソールからテスト実行を使って複数回実行して強制出力させます。

しばらく経つと、OpenSearch Dashboard の Discover から Index を作成できるようになるので作成します。

スクリーンショット 2024-11-17 12.49.37.png

スクリーンショット 2024-11-17 12.50.46.png

スクリーンショット 2024-11-17 12.51.02.png

以下のように 8 件のログが OpenSearch Dashboard 上で見られるようになってました。

スクリーンショット 2024-11-17 12.52.43.png

7. (オプション) 後片付け

検証用途の場合、そのままにしていると OpenSearch や Kinesis Data Streams、踏み台の EC2 インスタンスで高額の料金がかかってしまうため、削除しておきましょう。

  • OpenSearch Service と踏み台 EC2 の削除(npx cdk destroy
  • OpenSearch Integration の削除
  • Kinesis Data Streams の削除

さいごに

Amazon OpenSearch Ingestion が Amazon Kinesis Data Streams からのデータ取り込みが新たにサポートされるようになりました。

これまでは、Kinesis Data Streams のデータを OpenSearch へ投入するには、Lambda で OpenSearch へ書き込みに行くか、Firehose を経由して OpenSearch へ書き込む方法などがあげられます。
しかし、アーキテクチャが複雑になったり、コード書いて保守する必要があったり、複数のリソースを経由することでオーバーヘッドの増加が考えられます。
今回のアップデートで、OpenSearch Integration に直接 Kinesis Data Streams のデータを連携させられるため、運用コストが削減されるメリットや、よりリアルタイムでのデータ連携や分析が可能になったかと思います。

re:Invent 2024 が近づいていることもあり、Kinesis や OpenSearch Service の新たなアップデートも気になります。
この記事が、誰かのお役に立てれば。

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.